Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Dec 14, 2020

What changes were proposed in this pull request?

This PR fixes the race condition issue like as follows.

create temporary view t3 as select * from values
  ('val3a', 6S, 12, 110L, float(15), 20D, 20E2, timestamp '2014-04-04 01:02:00.000', date '2014-04-04'),
  ('val3a', 6S, 12, 10L, float(15), 20D, 20E2, timestamp '2014-05-04 01:02:00.000', date '2014-05-04'),
  ('val1b', 10S, 12, 219L, float(17), 25D, 26E2, timestamp '2014-05-04 01:02:00.000', date '2014-05-04'),
  ('val1b', 10S, 12, 19L, float(17), 25D, 26E2, timestamp '2014-05-04 01:02:00.000', date '2014-05-04'),
  ('val1b', 8S, 16, 319L, float(17), 25D, 26E2, timestamp '2014-06-04 01:02:00.000', date '2014-06-04'),
  ('val1b', 8S, 16, 19L, float(17), 25D, 26E2, timestamp '2014-07-04 01:02:00.000', date '2014-07-04'),
  ('val3c', 17S, 16, 519L, float(17), 25D, 26E2, timestamp '2014-08-04 01:02:00.000', date '2014-08-04'),
  ('val3c', 17S, 16, 19L, float(17), 25D, 26E2, timestamp '2014-09-04 01:02:00.000', date '2014-09-05'),
  ('val1b', null, 16, 419L, float(17), 25D, 26E2, timestamp '2014-10-04 01:02:00.000', null),
  ('val1b', null, 16, 19L, float(17), 25D, 26E2, timestamp '2014-11-04 01:02:00.000', null),
  ('val3b', 8S, null, 719L, float(17), 25D, 26E2, timestamp '2014-05-04 01:02:00.000', date '2014-05-04'),
  ('val3b', 8S, null, 19L, float(17), 25D, 26E2, timestamp '2015-05-04 01:02:00.000', date '2015-05-04')
  as t3(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i);

SELECT (SELECT min(t3d) FROM t3);

spark-sql> SELECT (SELECT min(t3d) FROM t3);
10
Time taken: 0.149 seconds, Fetched 1 row(s)
spark-sql> SELECT (SELECT min(t3d) FROM t3);
10
Time taken: 0.116 seconds, Fetched 1 row(s)
spark-sql> SELECT (SELECT min(t3d) FROM t3);
10
Time taken: 0.088 seconds, Fetched 1 row(s)
spark-sql> SELECT (SELECT min(t3d) FROM t3);
10
Time taken: 0.109 seconds, Fetched 1 row(s)
spark-sql> SELECT (SELECT min(t3d) FROM t3);
0                                                                                  <--------------- wrong result.
Time taken: 0.103 seconds, Fetched 1 row(s)
spark-sql> SELECT (SELECT min(t3d) FROM t3);
10
Time taken: 0.11 seconds, Fetched 1 row(s)

I figured out the root cause is race condition between subquery thread and main thread when hash aggregation is executed.

If a subquery thread reaches here before the main thread reaches, ExprCode#code is consumed by the subquery thread. Then, the main thread can't generate the following initialization statements.

           |$isNull = ${ev.isNull};
           |$value = ${ev.value};
         """.stripMargin

So, the variable represented by isNull is false as the default value and the other variable represented by value is 0.
This is why min can generate wrong results.

Why are the changes needed?

This is a serious correctness issue.

Does this PR introduce any user-facing change?

Yes.

How was this patch tested?

I confirmed that ExprCode#code for the main thread is no longer consumed by the subquery thread using debugger.

@sarutak
Copy link
Member Author

sarutak commented Dec 14, 2020

cc: @dongjoon-hyun

@github-actions github-actions bot added the SQL label Dec 14, 2020
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Dec 14, 2020

Hi, @sarutak .
Did you see this #30765 from @cloud-fan ? This might be fixed at there already.

At least, let's merge @cloud-fan 's PR first and re-evaluate this PR.

@sarutak
Copy link
Member Author

sarutak commented Dec 14, 2020

@dongjoon-hyun Ah, I didn't notice that. Thank you for letting me know.

@sarutak sarutak closed this Dec 14, 2020
@SparkQA
Copy link

SparkQA commented Dec 14, 2020

Test build #132778 has finished for PR 30766 at commit 7d5d142.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

// The variables are used as aggregation buffers and each aggregate function has one or more
// ExprCode to initialize its buffer slots. Only used for aggregation without keys.
private var bufVars: Seq[Seq[ExprCode]] = _
private val bufVar = new ThreadLocal[Seq[Seq[ExprCode]]] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this too, but my concern is that we never promise the codegen to be thread-safe. We may have more places to fix, and we may break it again in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants